package com.aisun.flinkservice.tomessage;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;

/**
 * @Author: LiuYuanQi
 * @Date: 2020/4/29 16:12
 * 提取时间戳、生成水印
 */
@Slf4j
public class MessageWaterEmitter implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {

    Long currentMaxTimestamp = 0L;
    final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    /**
     * 定义生成watermark的逻辑
     * 默认100ms被调用一次
     */
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }

    //定义如何提取timestamp
    @Override
    public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
        long timestamp = element.f1;
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        log.info("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
        return timestamp;
    }

}